Amplify Gen2 で DynamoDB Streams を使って DynamoDB テーブル間のデータ連携を行う

Amplify Gen2 で DynamoDB Streams を使って DynamoDB テーブル間のデータ連携を行う

Clock Icon2025.01.05

いわさです。

Amplify Gen2 を使って、DynamoDB でスコアを管理するアプリケーションを作成しています。

このアプリケーションでは、スコアが不定期に加算・減算されるイベントが発生します。
これをスコアアクティビティイベントと呼んでおり、外部から AppSync 経由でトリガーされています。構成図にするとこんな感じでしょうか。

before.png
スコアアクティビティイベントを受信するだけのアプリ

Amplify の Data コンポーネントでスコアアクティビティテーブルを定義するだけなので、以下のみで実現出来ます。

/amplify/data/resource.ts
import { type ClientSchema, a, defineData } from '@aws-amplify/backend';

const schema = a.schema({
  ScoreActivities: a
    .model({
      team_id: a.string().required(),
      score_change: a.integer().required(),
    })
    .authorization((allow) => [
      allow.ownerDefinedIn("team_id").identityClaim("custom:team_id"),
      allow.publicApiKey()
    ])
});

:

細切れにチームごとのスコアアクティビティイベントが発生するのですが、チームごとの合計スコアを管理したくなりました。スコアアクティビティテーブルをアプリケーションで集計することも出来るのですが、イベント購読やランキングなどを考えると「合計スコア」テーブルで管理出来ると良いのではと考えました。

そこで、今回は次のように DynamoDB Streams を経由して、スコアアクティビティイベント発生時に Lambda 関数経由で合計スコアテーブルを更新仕組みを実装してみたいと思います。
構成図で表現すると次のピンク色の枠の中を実装するイメージです。

after.png

TotalScore テーブルの実装

まずは合計スコアを管理するテーブル「TotalScore」を Data コンポーネント経由で定義します。
これは Amplify Gen2 でスキーマ定義を追加するだけなので簡単に実装が出来ます。

/amplify/data/resource.ts
import { type ClientSchema, a, defineData } from '@aws-amplify/backend';

const schema = a.schema({
  ScoreActivities: a
    .model({
      team_id: a.string().required(),
      score_change: a.integer().required(),
    })
    .authorization((allow) => [
      allow.ownerDefinedIn("team_id").identityClaim("custom:team_id"),
      allow.publicApiKey()
    ]),
  TotalScore: a
    .model({
      team_id: a.string().required(),
      total_score: a.integer().required(),
    })
    .identifier(['team_id'])
    .authorization((allow) => [
      allow.ownerDefinedIn("team_id").identityClaim("custom:team_id"),
      allow.publicApiKey()
    ]),
});

:

デプロイ後、Amplify コンソールのデータマネージャーから新しいテーブルが追加されていることを確認出来ました。

5616CA8F-9A78-443C-B43E-DAE1D20CEEFF.png

DynamoDB Streams から Lambda 関数をトリガー

続いて、ScoreActivities テーブルにアイテムが追加されたタイミングで TotalScore テーブルの合計スコア値を更新したいと思いますので、DynamoDB Streams 経由で Lambda 関数をトリガーします。

Amplify Gen2 では DyanamoDB Streams がデフォルトで有効になっており、Lambda 関数を定義してイベントマッピングを作成してやれば動作します。

https://docs.amplify.aws/react/build-a-backend/functions/examples/dynamo-db-stream/

Lambda 関数は先程定義した TotalScore へリクエストを送信する必要があるので、Amplify が生成する GraphQL エンドポイント URL と API キーを、Lambda 関数に引き渡します。

先日記事に書いたのですが、このパターンは循環依存が発生するケースになりますので、次の記事を参考に Lambda 関数は Amplify 管理外のカスタムリソースとして定義しました。

https://dev.classmethod.jp/articles/amplify-gen2-circular-lambda/

最終的な実装は次のような感じに。別途記事にしたいと思いますが StartingPosition の仕様については少し注意したほうが良いです。データ損失が起きる場合があるので。

amplify/backend.ts
import { defineBackend } from '@aws-amplify/backend';
import { auth } from './auth/resource';
import { data } from './data/resource';
import { preTokenGenerationV2 } from './auth/pre-token-generation-v2/resource';
import { Effect, PolicyStatement } from 'aws-cdk-lib/aws-iam';
import { EventSourceMapping, StartingPosition } from 'aws-cdk-lib/aws-lambda';

import { NodejsFunction} from 'aws-cdk-lib/aws-lambda-nodejs';
import * as url from 'node:url';

const backend = defineBackend({
  auth,
  data,
  preTokenGenerationV2,
});

const { cfnUserPool } = backend.auth.resources.cfnResources
cfnUserPool.addPropertyOverride("LambdaConfig.PreTokenGenerationConfig",{
  LambdaVersion: 'V2_0',
  LambdaArn: backend.preTokenGenerationV2.resources.lambda.functionArn,
});

const updateTotalScoreStack = backend.createStack('UpdateTotalScoreStack');
const funcitonUpdateTotalScoreOnActivity = new NodejsFunction(
  updateTotalScoreStack,
  'update-totalscore-on-activity',
  {
    entry: url.fileURLToPath(new URL('./functions/update-totalscore-on-activity/handler.ts', import.meta.url)),
    environment: {
      APPSYNC_ENDPOINT: backend.data.resources.cfnResources.cfnGraphqlApi.attrGraphQlUrl,
      APPSYNC_API_KEY: backend.data.resources.cfnResources.cfnApiKey?.attrApiKey || '',
    },
    initialPolicy: [
      new PolicyStatement({
        effect: Effect.ALLOW,
        actions: [
          "dynamodb:DescribeStream",
          "dynamodb:GetRecords",
          "dynamodb:GetShardIterator",
          "dynamodb:ListStreams",
        ],
        resources: ["*"],
      }),
    ],
  }
)
new EventSourceMapping(
  updateTotalScoreStack,
  "TotalScoreFunctionEventSourceMapping",
  {
    eventSourceArn: backend.data.resources.tables["ScoreActivities"].tableStreamArn,
    target: funcitonUpdateTotalScoreOnActivity,
    batchSize: 100,
    startingPosition: StartingPosition.LATEST,
  }
);

Lambda 関数から AppSync 経由で TotalScore を更新

最後に DynamoDB Streams からトリガーされた Lambda 関数で TotalScore テーブルに書き込みを行えば完了です。

今回は、次の記事を参考に Lambda 関数で Axios を使ってリクエストを送信します。記事ではデフォルトプロバイダーで SigV4 署名していますが、私のほうは諸事情により API キーを使いました。

https://dev.classmethod.jp/articles/amplify-gen2-appsync-mutation-lambda/

イベントで受信したスコアを TotalScore に加算します。該当キーのアイテムが存在しない場合は作成も行います。
コードは生成 AI に手伝ってもらいながら手直しを入れたものです。便利な世の中になったものだ。

amplify/functions/update-totalscore-on-activity/handler.ts
import axios, { AxiosError } from "axios";
import type { DynamoDBStreamHandler } from "aws-lambda";
import { Logger } from "@aws-lambda-powertools/logger";

type TotalScoreInput = {
  team_id: string;
  total_score: number;
};

type GraphQLError = {
  message: string;
  path: string[];
  errorType: string;
};

const logger = new Logger({
  logLevel: "INFO",
  serviceName: "dynamodb-stream-handler",
});

// GraphQL操作の定義
const CREATE_TOTAL_SCORE = `
  mutation CreateTotalScore($input: CreateTotalScoreInput!) {
    createTotalScore(input: $input) {
      team_id
      total_score
    }
  }
`;

const UPDATE_TOTAL_SCORE = `
  mutation UpdateTotalScore($input: UpdateTotalScoreInput!) {
    updateTotalScore(input: $input) {
      team_id
      total_score
    }
  }
`;

const GET_TOTAL_SCORE = `
  query GetTotalScore($team_id: String!) {
    getTotalScore(team_id: $team_id) {
      team_id
      total_score
    }
  }
`;

async function executeGraphQLRequest(query: string, variables: any) {
  if (!process.env.APPSYNC_ENDPOINT || !process.env.APPSYNC_API_KEY) {
    throw new Error("Missing required environment variables: APPSYNC_ENDPOINT or APPSYNC_API_KEY");
  }

  const response = await axios.post(
    process.env.APPSYNC_ENDPOINT,
    {
      query,
      variables,
    },
    {
      headers: {
        'Content-Type': 'application/json',
        'x-api-key': process.env.APPSYNC_API_KEY,
      },
    }
  );

  if (response.data.errors) {
    throw new Error(`GraphQL Error: ${JSON.stringify(response.data.errors)}`);
  }

  return response.data;
}

async function updateOrCreateTotalScore(teamId: string, newTotalScore: number) {
  try {
    const updateResponse = await executeGraphQLRequest(
      UPDATE_TOTAL_SCORE,
      {
        input: {
          team_id: teamId,
          total_score: newTotalScore
        }
      }
    );
    logger.info('Successfully updated total score', { teamId, newTotalScore });
    return updateResponse;
  } catch (error) {
    if (error instanceof Error) {
      if (error.message.includes('ConditionalCheckFailedException')) {
        logger.info('Record not found, creating new total score', { teamId, newTotalScore });
        const createResponse = await executeGraphQLRequest(
          CREATE_TOTAL_SCORE,
          {
            input: {
              team_id: teamId,
              total_score: newTotalScore
            }
          }
        );
        return createResponse;
      }
    }
    throw error;
  }
}

export const handler: DynamoDBStreamHandler = async (event) => {
  const batchItemFailures: Array<{itemIdentifier: string}> = [];
  
  logger.info('Starting to process DynamoDB Stream records', { 
    recordCount: event.Records.length 
  });

  for (const record of event.Records) {
    try {
      if (record.eventName !== 'INSERT') {
        logger.info(`Skipping non-INSERT event: ${record.eventName}`);
        continue;
      }

      const newImage = record.dynamodb?.NewImage;
      if (!newImage) {
        logger.warn('No NewImage in record, skipping');
        continue;
      }

      const teamId = newImage.team_id?.S;
      const scoreChange = parseInt(newImage.score_change?.N ?? '0');

      if (!teamId || scoreChange === 0) {
        logger.warn('Invalid record data', { teamId, scoreChange });
        continue;
      }

      const getTotalScoreResponse = await executeGraphQLRequest(
        GET_TOTAL_SCORE,
        { team_id: teamId }
      );

      const currentScore = getTotalScoreResponse.data.getTotalScore?.total_score ?? 0;
      const newTotalScore = currentScore + scoreChange;

      const result = await updateOrCreateTotalScore(teamId, newTotalScore);

      logger.info('Successfully processed score change', {
        teamId,
        scoreChange,
        currentScore,
        newTotalScore,
        operation: result.data.createTotalScore ? 'create' : 'update'
      });

    } catch (error) {
      const errorMessage = error instanceof Error ? error.message : String(error);
      const errorStack = error instanceof Error ? error.stack : undefined;

      logger.error('Error processing record', {
        error: errorMessage,
        stack: errorStack,
        recordId: record.eventID
      });
      
      if (record.eventID) {
        batchItemFailures.push({
          itemIdentifier: record.eventID
        });
      }
    }
  }
  
  return { batchItemFailures };
};

実行結果

ではデプロイ後に試してみます。
まずは ScoreActivities テーブルで、team1 に対して 1000 を加算します。

246F144B-256F-4A2F-B952-3F516656CE41.png

TotalScore を確認してみると、team1 の合計スコアが 1000 になっていることを確認出来ました。新規作成の部分うまく動いていそうです。

C0589BF1-05FB-487E-856B-A0797C750EE4.png

続いて同じチームに対して、さらに 200 加算します。

1BA31C1E-8686-4F33-93C5-0BF57263BB22.png

1200 に加算されました。更新も出来てますね。

804C8C59-21A3-425B-8BA6-EA0C6BE9DCA6.png

続いて違うチームのスコアを発生させてみました。

31616A32-65A7-4628-B5BF-3FA62B9F274D.png

既存のチームには影響なく新しいチームのアイテムが作成されました。
この時点では team1 のほうがスコアが高いです。

E46CD4AF-6357-4375-A2CD-08F94A30CFB9.png

ここで team1 に対して減算イベントを発生させます。-300 です。

CE305C28-AB28-4FEE-BF07-042F4901E42F.png

減算も出来てますね。team2 のほうが合計スコアが大きくなりました。

0194A02B-3471-44D6-B64B-47C447971030.png

さいごに

本日は Amplify Gen2 で DynamoDB Streams を使って DynamoDB テーブル間のデータ連携を行う方法として、 AppSync スキーマ間の連携に DynamoDB Streams 経由を使ってみました。まぁ悪くないのではと思いますがどうでしょう。

Amplify + TypeScript 力があまり高くないもので、生成 AI ベースの Lambda 関数となっているので、見る人が見ると「なんだこのコードは」となるのかもしれません。そのあたりは大目に見てください。

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.